Example 📓 Jupyter notebooks that demonstrate how to build, train, and deploy machine learning models using 🧠 Amazon SageMaker.
|
|
{
|
||
|
|
"cells": [
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "207aa954",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"## Train Machine Learning Models using Amazon Keyspaces as a Data Source \n"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"attachments": {},
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"---\n",
|
||
|
|
"\n",
|
||
|
|
"This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"---"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "207aa954",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"\n",
|
||
|
|
"Contributors\n",
|
||
|
|
"- `Vadim Lyakhovich (AWS)`\n",
|
||
|
|
"- `Ram Pathangi (AWS)`\n",
|
||
|
|
"- `Parth Patel (AWS)`\n",
|
||
|
|
"- `Arvind Jain (AWS)`\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"*Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved* \n",
|
||
|
|
"[*SPDX-License-Identifier: MIT-0*](https://github.com/aws/mit-0)\n"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "2d10bdcf",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"### Prerequisites\n",
|
||
|
|
"\n",
|
||
|
|
"The Notebook execution role must include permissions to access Amazon Keyspaces and Assume the role.\n",
|
||
|
|
"\n",
|
||
|
|
"* To access `Amazon Keyspaces` database - use `AmazonKeyspacesReadOnlyAccess` or `AmazonKeyspacesFullAccess` managed policies. Use policy of the least privilege when running in production. \n",
|
||
|
|
"\n",
|
||
|
|
"See more at\n",
|
||
|
|
"[`AWS Identity and Access Management for Amazon Keyspaces`](https://docs.aws.amazon.com/keyspaces/latest/devguide/security-iam.html).\n",
|
||
|
|
"\n",
|
||
|
|
"* To assume the role, you need to have [`sts:AssumeRole action`](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) permissions.\n",
|
||
|
|
" ```bash\n",
|
||
|
|
" {\n",
|
||
|
|
" \"Version\": \"2012-10-17\", \n",
|
||
|
|
" \"Statement\": [ \n",
|
||
|
|
" { \n",
|
||
|
|
" \"Action\": [ \n",
|
||
|
|
" \"sts:AssumeRole\" \n",
|
||
|
|
" ], \n",
|
||
|
|
" \"Effect\": \"Allow\", \n",
|
||
|
|
" \"Resource\": \"*\" \n",
|
||
|
|
" }\n",
|
||
|
|
" ]\n",
|
||
|
|
" }\n",
|
||
|
|
" ```\n",
|
||
|
|
"\n",
|
||
|
|
"#### Note:\n",
|
||
|
|
"Amazon Keyspaces is available in the following [AWS Regions](https://docs.aws.amazon.com/keyspaces/latest/devguide/programmatic.endpoints.html).\n",
|
||
|
|
"\n",
|
||
|
|
"This notebook was tested with `conda_python3` kernel and should work with Python 3.x."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "29439d73",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"In this notebook, \n",
|
||
|
|
"\n",
|
||
|
|
"1. First, we install Sigv4 driver to connect to Amazon Keyspaces\n",
|
||
|
|
"\n",
|
||
|
|
"> The Amazon Keyspaces SigV4 authentication plugin for Cassandra client drivers enables you to authenticate calls to Amazon Keyspaces ***using IAM access keys instead of username and password***. To learn more about how the Amazon Keyspaces SigV4 plugin enables [`IAM users, roles, and federated identities`](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) to authenticate in Amazon Keyspaces API requests, see [`AWS Signature Version 4 process (SigV4)`](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)\n",
|
||
|
|
"\n",
|
||
|
|
"2. Next, we establish a connection to Amazon Keyspaces \n",
|
||
|
|
"3. Next, we create a new `Keyspace ***blog_(yyyymmdd)***` and a new `table ***online_retail***`\n",
|
||
|
|
"3. Next, we download retail data about customers.\n",
|
||
|
|
"3. Next, we ingest retail data about customers into Keyspaces.\n",
|
||
|
|
"3. Next, we use a notebook available within SageMaker Studio to collect data from Keyspaces database, and prepare data for training using KNN Algorithm. Most of our customers use SageMaker Studio for end to end development of ML Use Cases. They could use this notebook as a base and customize it quickly for their use case. Additionally, the customers can share this with other collaborators without requiring them to install any additional software. \n",
|
||
|
|
"3. Next, we train the data for clustering.\n",
|
||
|
|
"3. After the training is complete, we can view the mapping between customer and their associated cluster.\n",
|
||
|
|
"3. And finally, Cleanup Step to drop Keyspaces table to avoid future charges. \n",
|
||
|
|
"\n"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "17cd92c7",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Install missing packages and import dependencies\n",
|
||
|
|
"\n",
|
||
|
|
"# Installing Cassanda SigV4\n",
|
||
|
|
"%pip install cassandra-sigv4\n",
|
||
|
|
"\n",
|
||
|
|
"# Get Security certificate\n",
|
||
|
|
"!curl https://certs.secureserver.net/repository/sf-class2-root.crt -O\n",
|
||
|
|
"\n",
|
||
|
|
"# Import\n",
|
||
|
|
"from sagemaker import get_execution_role\n",
|
||
|
|
"from cassandra.cluster import Cluster\n",
|
||
|
|
"from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED\n",
|
||
|
|
"from cassandra_sigv4.auth import SigV4AuthProvider\n",
|
||
|
|
"import boto3\n",
|
||
|
|
"\n",
|
||
|
|
"import pandas as pd\n",
|
||
|
|
"from pandas import DataFrame\n",
|
||
|
|
"\n",
|
||
|
|
"import csv\n",
|
||
|
|
"from cassandra import ConsistencyLevel\n",
|
||
|
|
"from datetime import datetime\n",
|
||
|
|
"import time\n",
|
||
|
|
"from datetime import timedelta\n",
|
||
|
|
"\n",
|
||
|
|
"import pandas as pd\n",
|
||
|
|
"import datetime as dt\n",
|
||
|
|
"import seaborn as sns\n",
|
||
|
|
"import matplotlib.pyplot as plt\n",
|
||
|
|
"from sklearn.cluster import KMeans\n",
|
||
|
|
"from sklearn.preprocessing import MinMaxScaler\n",
|
||
|
|
"\n",
|
||
|
|
"# Getting credentials from the role\n",
|
||
|
|
"client = boto3.client(\"sts\")\n",
|
||
|
|
"\n",
|
||
|
|
"# Get notebook Role\n",
|
||
|
|
"role = get_execution_role()\n",
|
||
|
|
"role_info = {\"RoleArn\": role, \"RoleSessionName\": \"session1\"}\n",
|
||
|
|
"print(role_info)\n",
|
||
|
|
"\n",
|
||
|
|
"credentials = client.assume_role(**role_info)"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "aee7e72d",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Connect to Cassandra Database from SageMaker Notebook using temporary credentials from the Role.\n",
|
||
|
|
"session = boto3.session.Session(\n",
|
||
|
|
" aws_access_key_id=credentials[\"Credentials\"][\"AccessKeyId\"],\n",
|
||
|
|
" aws_secret_access_key=credentials[\"Credentials\"][\"SecretAccessKey\"],\n",
|
||
|
|
" aws_session_token=credentials[\"Credentials\"][\"SessionToken\"],\n",
|
||
|
|
")\n",
|
||
|
|
"\n",
|
||
|
|
"region_name = session.region_name\n",
|
||
|
|
"\n",
|
||
|
|
"# Set Context\n",
|
||
|
|
"ssl_context = SSLContext(PROTOCOL_TLSv1_2)\n",
|
||
|
|
"ssl_context.load_verify_locations(\"sf-class2-root.crt\")\n",
|
||
|
|
"ssl_context.verify_mode = CERT_REQUIRED\n",
|
||
|
|
"\n",
|
||
|
|
"auth_provider = SigV4AuthProvider(session)\n",
|
||
|
|
"\n",
|
||
|
|
"keyspaces_host = \"cassandra.\" + region_name + \".amazonaws.com\"\n",
|
||
|
|
"\n",
|
||
|
|
"cluster = Cluster([keyspaces_host], ssl_context=ssl_context, auth_provider=auth_provider, port=9142)\n",
|
||
|
|
"session = cluster.connect()\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"# Read data from Keyspaces system table. Keyspaces is serverless DB so you don't have to create Keyspaces DB ahead of time.\n",
|
||
|
|
"r = session.execute(\"select * from system_schema.keyspaces\")\n",
|
||
|
|
"\n",
|
||
|
|
"# Read Keyspaces row into Panda DataFrame\n",
|
||
|
|
"df = DataFrame(r)\n",
|
||
|
|
"print(df)"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "e14d7fa9",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"## Download Sample data \n",
|
||
|
|
"For this example we are using public repository at http://archive.ics.uci.edu/ml [1]\n",
|
||
|
|
"\n",
|
||
|
|
"### References ###\n",
|
||
|
|
"`[1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.`\n"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "c3aa9ea8",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Download sample data\n",
|
||
|
|
"\n",
|
||
|
|
"!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/online_retail/online_retail_II_20k.csv ."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "717151f6",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"#### In this step we create a new ***blog_(yyyymmdd)*** keyspace and ***online_retail*** table\n",
|
||
|
|
"\n",
|
||
|
|
"```\n",
|
||
|
|
"CREATE KEYSPACE IF NOT EXISTS blog_yyyymmdd\n",
|
||
|
|
"WITH\n",
|
||
|
|
" REPLICATION = {'class': 'SingleRegionStrategy'}\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"CREATE TABLE IF NOT EXISTS online_retail ( \n",
|
||
|
|
" invoice\ttext,\n",
|
||
|
|
" stock_code\ttext,\n",
|
||
|
|
" description\ttext,\n",
|
||
|
|
" quantity\tint,\n",
|
||
|
|
" invoice_date\tdate,\n",
|
||
|
|
" price\tdecimal,\n",
|
||
|
|
" customer_id\ttext,\n",
|
||
|
|
" country\ttext,\n",
|
||
|
|
" PRIMARY KEY (invoice,stock_code));\n",
|
||
|
|
" \n",
|
||
|
|
"```"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "98ccf29c",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Create Keyspace\n",
|
||
|
|
"\n",
|
||
|
|
"dt = datetime.now()\n",
|
||
|
|
"keyspaces_schema = \"blog_\" + str(dt.year) + str(dt.month) + str(dt.day)\n",
|
||
|
|
"\n",
|
||
|
|
"createKeyspace = \"\"\"CREATE KEYSPACE IF NOT EXISTS %s\n",
|
||
|
|
"WITH\n",
|
||
|
|
" REPLICATION = {'class': 'SingleRegionStrategy'}; \"\"\"\n",
|
||
|
|
"cr = session.execute(createKeyspace % keyspaces_schema)\n",
|
||
|
|
"time.sleep(5)\n",
|
||
|
|
"print(\"Keyspace '\" + keyspaces_schema + \"' created\")\n",
|
||
|
|
"\n",
|
||
|
|
"# Create Table\n",
|
||
|
|
"createTable = \"\"\"CREATE TABLE IF NOT EXISTS %s.online_retail ( \n",
|
||
|
|
" invoice text,\n",
|
||
|
|
" stock_code text,\n",
|
||
|
|
" description text,\n",
|
||
|
|
" quantity int,\n",
|
||
|
|
" invoice_date date,\n",
|
||
|
|
" price decimal,\n",
|
||
|
|
" customer_id text,\n",
|
||
|
|
" country text,\n",
|
||
|
|
" PRIMARY KEY (invoice,stock_code));\n",
|
||
|
|
"\"\"\"\n",
|
||
|
|
"cr = session.execute(createTable % keyspaces_schema)\n",
|
||
|
|
"time.sleep(20)\n",
|
||
|
|
"print(\"Table 'online_retail' created\")"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "ce87792a",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"## Loading Data\n",
|
||
|
|
"Reading Online Retail CSV file and ingesting into Keyspaces table"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "2ce1813a",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Populate test data.\n",
|
||
|
|
"\n",
|
||
|
|
"# csv file name\n",
|
||
|
|
"filename = \"online_retail_II_20k.csv\"\n",
|
||
|
|
"ROW_LIMIT = 20000\n",
|
||
|
|
"PRINT_ROWS = 2000\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"# initializing the titles and rows list\n",
|
||
|
|
"fields = []\n",
|
||
|
|
"rows = []\n",
|
||
|
|
"\n",
|
||
|
|
"insert = (\n",
|
||
|
|
" \"INSERT INTO \"\n",
|
||
|
|
" + keyspaces_schema\n",
|
||
|
|
" + '.online_retail (\"invoice\",\"stock_code\",\"description\",\"quantity\",\"invoice_date\",\"price\",\"customer_id\",\"country\") VALUES (?,?,?,?,?,?,?,?);'\n",
|
||
|
|
")\n",
|
||
|
|
"\n",
|
||
|
|
"prepared = session.prepare(insert)\n",
|
||
|
|
"prepared.consistency_level = ConsistencyLevel.LOCAL_QUORUM\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"print(\"Start Loading\", ROW_LIMIT, \"rows into the table at\", datetime.now())\n",
|
||
|
|
"start_time = time.monotonic()\n",
|
||
|
|
"\n",
|
||
|
|
"# reading csv file.\n",
|
||
|
|
"with open(filename, \"r\", encoding=\"utf-8-sig\") as csvfile:\n",
|
||
|
|
" # creating a csv reader object\n",
|
||
|
|
" csvreader = csv.reader(csvfile)\n",
|
||
|
|
"\n",
|
||
|
|
" # extracting field names through first row\n",
|
||
|
|
" fields = next(csvreader)\n",
|
||
|
|
"\n",
|
||
|
|
" # extracting each data row one by one\n",
|
||
|
|
" # print(fields)\n",
|
||
|
|
" for row in csvreader:\n",
|
||
|
|
" try:\n",
|
||
|
|
" if (csvreader.line_num % PRINT_ROWS) == 0:\n",
|
||
|
|
" print(\"Rows so far: %d\" % (csvreader.line_num))\n",
|
||
|
|
" print(datetime.now())\n",
|
||
|
|
"\n",
|
||
|
|
" # print(row)\n",
|
||
|
|
" inv_date = datetime.strptime(row[4], \"%m/%d/%y %H:%M\")\n",
|
||
|
|
" # print(inv_date)\n",
|
||
|
|
" r = session.execute(\n",
|
||
|
|
" prepared,\n",
|
||
|
|
" (\n",
|
||
|
|
" str(row[0]),\n",
|
||
|
|
" str(row[1]),\n",
|
||
|
|
" str(row[2]),\n",
|
||
|
|
" int(row[3]),\n",
|
||
|
|
" inv_date,\n",
|
||
|
|
" float(row[5]),\n",
|
||
|
|
" str(row[6]),\n",
|
||
|
|
" str(row[7]),\n",
|
||
|
|
" ),\n",
|
||
|
|
" )\n",
|
||
|
|
"\n",
|
||
|
|
" if csvreader.line_num >= ROW_LIMIT:\n",
|
||
|
|
" break\n",
|
||
|
|
" except Exception as ex:\n",
|
||
|
|
" print(\"Error for row %d\" % (csvreader.line_num))\n",
|
||
|
|
" print(row)\n",
|
||
|
|
" print(ex)\n",
|
||
|
|
"\n",
|
||
|
|
" # get total number of rows\n",
|
||
|
|
" print(\"Total no. of rows: %d\" % (csvreader.line_num))\n",
|
||
|
|
"\n",
|
||
|
|
"end_time = time.monotonic()\n",
|
||
|
|
"print(\"Load time:\", timedelta(seconds=end_time - start_time), \"for\", ROW_LIMIT, \"rows\")"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "e2dc6a6a",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"## ML Code"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "79e48035",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"Now that we have data in Keyspace, let's read the data from Keyspace into the data frame. Once you have data into data frame you can perform the data cleanup to make sure it\u2019s ready to train the modal. "
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "433fbc4a",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Reading Data from Keyspaces\n",
|
||
|
|
"r = session.execute(\"select * from \" + keyspaces_schema + \".online_retail\")\n",
|
||
|
|
"\n",
|
||
|
|
"df = DataFrame(r)\n",
|
||
|
|
"df.head(100)"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "cdfa1734",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"in this example, we use CQL to read records from the Keyspace table.\n",
|
||
|
|
"\n",
|
||
|
|
"In some ML use-cases, you may need to read the same data from the same Keyspaces table multiple times. In this case, we recommend to save your data into an Amazon S3 bucket to avoid incurring additional costs reading from Amazon Keyspaces. Depending on your scenario, you may also use Amazon EMR to ingest a very large Amazon S3 file into SageMaker."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "0e06c0f7",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"## Code to save Python DataFrame to S3\n",
|
||
|
|
"import sagemaker\n",
|
||
|
|
"from io import StringIO # python3 (or BytesIO for python2)\n",
|
||
|
|
"\n",
|
||
|
|
"smclient = boto3.Session().client(\"sagemaker\")\n",
|
||
|
|
"sess = sagemaker.Session()\n",
|
||
|
|
"bucket = sess.default_bucket() # Set a default S3 bucket\n",
|
||
|
|
"print(bucket)\n",
|
||
|
|
"\n",
|
||
|
|
"sess = sagemaker.Session()\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"csv_buffer = StringIO()\n",
|
||
|
|
"df.to_csv(csv_buffer)\n",
|
||
|
|
"s3_resource = boto3.resource(\"s3\")\n",
|
||
|
|
"s3_resource.Object(bucket, \"out/saved_online_retail.csv\").put(Body=csv_buffer.getvalue())"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "e7b822ce",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"In this example we also group the data based on Recency, Frequency and Monetary value to generate RFM Matrix. Our business objective given the data set is to cluster the customers using this specific metric call RFM. The RFM model is based on three quantitative factors:\n",
|
||
|
|
"\n",
|
||
|
|
" - Recency: How recently a customer has made a purchase.\n",
|
||
|
|
" - Frequency: How often a customer makes a purchase.\n",
|
||
|
|
" - Monetary Value: How much money a customer spends on purchases.\n",
|
||
|
|
"\n",
|
||
|
|
"RFM analysis numerically ranks a customer in each of these three categories, generally on a scale of 1 to 5 (the higher the number, the better the result). The best customer receives a top score in every category. We are using pandas\u2019s Quantile-based discretization function (qcut). It helps discretize values into equal-sized buckets based or based on sample quantiles. At end we see predicted cluster / segments for customers described like \"New Customers\", \"Hibernating\", \"Promising\" etc. "
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "5199e64a",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Prepare Data\n",
|
||
|
|
"df.count()\n",
|
||
|
|
"df[\"description\"].nunique()\n",
|
||
|
|
"df[\"totalprice\"] = df[\"quantity\"] * df[\"price\"]\n",
|
||
|
|
"df.groupby(\"invoice\").agg({\"totalprice\": \"sum\"}).head()\n",
|
||
|
|
"\n",
|
||
|
|
"df.groupby(\"description\").agg({\"price\": \"max\"}).sort_values(\"price\", ascending=False).head()\n",
|
||
|
|
"df.sort_values(\"price\", ascending=False).head()\n",
|
||
|
|
"df[\"country\"].value_counts().head()\n",
|
||
|
|
"df.groupby(\"country\").agg({\"totalprice\": \"sum\"}).sort_values(\"totalprice\", ascending=False).head()\n",
|
||
|
|
"\n",
|
||
|
|
"returned = df[df[\"invoice\"].str.contains(\"C\", na=False)]\n",
|
||
|
|
"returned.sort_values(\"quantity\", ascending=True).head()\n",
|
||
|
|
"\n",
|
||
|
|
"df.isnull().sum()\n",
|
||
|
|
"df.dropna(inplace=True)\n",
|
||
|
|
"df.isnull().sum()\n",
|
||
|
|
"df.dropna(inplace=True)\n",
|
||
|
|
"df.isnull().sum()\n",
|
||
|
|
"df.describe([0.05, 0.01, 0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.99]).T\n",
|
||
|
|
"df.drop(df.loc[df[\"customer_id\"] == \"\"].index, inplace=True)\n",
|
||
|
|
"\n",
|
||
|
|
"# Recency Metric\n",
|
||
|
|
"import datetime as dt\n",
|
||
|
|
"\n",
|
||
|
|
"today_date = dt.date(2011, 12, 9)\n",
|
||
|
|
"df[\"customer_id\"] = df[\"customer_id\"].astype(int)\n",
|
||
|
|
"\n",
|
||
|
|
"# create get the most recent invoice for each customer\n",
|
||
|
|
"temp_df = df.groupby(\"customer_id\").agg({\"invoice_date\": \"max\"})\n",
|
||
|
|
"temp_df[\"invoice_date\"] = temp_df[\"invoice_date\"].astype(str)\n",
|
||
|
|
"temp_df[\"invoice_date\"] = pd.to_datetime(temp_df[\"invoice_date\"]).dt.date\n",
|
||
|
|
"temp_df[\"Recency\"] = (today_date - temp_df[\"invoice_date\"]).dt.days\n",
|
||
|
|
"recency_df = temp_df.drop(columns=[\"invoice_date\"])\n",
|
||
|
|
"recency_df.head()\n",
|
||
|
|
"\n",
|
||
|
|
"# Frequency Metric\n",
|
||
|
|
"temp_df = df.groupby([\"customer_id\", \"invoice\"]).agg({\"invoice\": \"count\"})\n",
|
||
|
|
"freq_df = temp_df.groupby(\"customer_id\").agg({\"invoice\": \"count\"})\n",
|
||
|
|
"freq_df.rename(columns={\"invoice\": \"Frequency\"}, inplace=True)\n",
|
||
|
|
"\n",
|
||
|
|
"# Monetary Metric\n",
|
||
|
|
"monetary_df = df.groupby(\"customer_id\").agg({\"totalprice\": \"sum\"})\n",
|
||
|
|
"monetary_df.rename(columns={\"totalprice\": \"Monetary\"}, inplace=True)\n",
|
||
|
|
"rfm = pd.concat([recency_df, freq_df, monetary_df], axis=1)\n",
|
||
|
|
"\n",
|
||
|
|
"df = rfm\n",
|
||
|
|
"df[\"RecencyScore\"] = pd.qcut(df[\"Recency\"], 5, labels=[5, 4, 3, 2, 1])\n",
|
||
|
|
"df[\"FrequencyScore\"] = pd.qcut(df[\"Frequency\"].rank(method=\"first\"), 5, labels=[1, 2, 3, 4, 5])\n",
|
||
|
|
"df[\"Monetary\"] = df[\"Monetary\"].astype(int)\n",
|
||
|
|
"df[\"MonetaryScore\"] = pd.qcut(df[\"Monetary\"], 5, labels=[1, 2, 3, 4, 5])\n",
|
||
|
|
"df[\"RFM_SCORE\"] = (\n",
|
||
|
|
" df[\"RecencyScore\"].astype(str)\n",
|
||
|
|
" + df[\"FrequencyScore\"].astype(str)\n",
|
||
|
|
" + df[\"MonetaryScore\"].astype(str)\n",
|
||
|
|
")\n",
|
||
|
|
"seg_map = {\n",
|
||
|
|
" r\"[1-2][1-2]\": \"Hibernating\",\n",
|
||
|
|
" r\"[1-2][3-4]\": \"At Risk\",\n",
|
||
|
|
" r\"[1-2]5\": \"Can't Loose\",\n",
|
||
|
|
" r\"3[1-2]\": \"About to Sleep\",\n",
|
||
|
|
" r\"33\": \"Need Attention\",\n",
|
||
|
|
" r\"[3-4][4-5]\": \"Loyal Customers\",\n",
|
||
|
|
" r\"41\": \"Promising\",\n",
|
||
|
|
" r\"51\": \"New Customers\",\n",
|
||
|
|
" r\"[4-5][2-3]\": \"Potential Loyalists\",\n",
|
||
|
|
" r\"5[4-5]\": \"Champions\",\n",
|
||
|
|
"}\n",
|
||
|
|
"\n",
|
||
|
|
"df[\"Segment\"] = df[\"RecencyScore\"].astype(str) + rfm[\"FrequencyScore\"].astype(str)\n",
|
||
|
|
"df[\"Segment\"] = df[\"Segment\"].replace(seg_map, regex=True)\n",
|
||
|
|
"df.head()\n",
|
||
|
|
"rfm = df.loc[:, \"Recency\":\"Monetary\"]\n",
|
||
|
|
"df.groupby(\"customer_id\").agg({\"Segment\": \"sum\"}).head()"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "9ad3ab64",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"Now that we have our final dataset, we can start our training. \n",
|
||
|
|
"\n",
|
||
|
|
"Here you notice that we are doing data engineering by using a transform function that scales each feature to a given range. `MinMaxScaler()` function scales and translates each feature individually such that it is in the given range on the training set, e.g. between zero and one in our case.\n",
|
||
|
|
"\n",
|
||
|
|
"Next we do transform (analyzes the data to generate the coefficients) and fit (calculates the parameters/weights on training data) on the input data at a single time and converts the data points. This `fit_transform()` method used below is basically a combination of fit method and transform method, it is equivalent to `fit(). transform()`. \n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"Next, the KMeans algorithm creates the clusters (customers grouped together based on various attributes in the data set). This cluster information (a.k.a segments) can be used for targeted marketing campaigns."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "ea020e9f",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Training\n",
|
||
|
|
"\n",
|
||
|
|
"sc = MinMaxScaler((0, 1))\n",
|
||
|
|
"df = sc.fit_transform(rfm)\n",
|
||
|
|
"\n",
|
||
|
|
"# Clustering\n",
|
||
|
|
"kmeans = KMeans(n_clusters=6).fit(df)\n",
|
||
|
|
"\n",
|
||
|
|
"# Result\n",
|
||
|
|
"segment = kmeans.labels_"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "46ca68e4",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"Let\u2019s visualize the data to see how records are distributed in different clusters."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "e1258e4a",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Visualize the clusters\n",
|
||
|
|
"import matplotlib.pyplot as plt\n",
|
||
|
|
"\n",
|
||
|
|
"final_df = pd.DataFrame({\"customer_id\": rfm.index, \"Segment\": segment})\n",
|
||
|
|
"bucket_data = final_df.groupby(\"Segment\").agg({\"customer_id\": \"count\"}).head()\n",
|
||
|
|
"index_data = final_df.groupby(\"Segment\").agg({\"Segment\": \"max\"}).head()\n",
|
||
|
|
"index_data[\"Segment\"] = index_data[\"Segment\"].astype(int)\n",
|
||
|
|
"dataFrame = pd.DataFrame(data=bucket_data[\"customer_id\"], index=index_data[\"Segment\"])\n",
|
||
|
|
"dataFrame.rename(columns={\"customer_id\": \"Total Customers\"}).plot.bar(\n",
|
||
|
|
" rot=70, title=\"RFM clustering\"\n",
|
||
|
|
")\n",
|
||
|
|
"# dataFrame.plot.bar(rot=70, title=\"RFM clustoring\");\n",
|
||
|
|
"plt.show(block=True);"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "e822c021",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"Next, we save the customer segments that have been identified by the ML model back to an Amazon Keyspaces table for targeted marketing. A batch job could read this data and run targeted campaigns to customers in specific segments."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "bc74f01d",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"# Create ml_clustering_results table to store the results\n",
|
||
|
|
"createTable = \"\"\"CREATE TABLE IF NOT EXISTS %s.ml_clustering_results ( \n",
|
||
|
|
" run_id text,\n",
|
||
|
|
" segment int,\n",
|
||
|
|
" total_customers int,\n",
|
||
|
|
" run_date date,\n",
|
||
|
|
" PRIMARY KEY (run_id, segment));\n",
|
||
|
|
"\"\"\"\n",
|
||
|
|
"cr = session.execute(createTable % keyspaces_schema)\n",
|
||
|
|
"time.sleep(20)\n",
|
||
|
|
"print(\"Table 'ml_clustering_results' created\")\n",
|
||
|
|
"\n",
|
||
|
|
"insert_ml = (\n",
|
||
|
|
" \"INSERT INTO \"\n",
|
||
|
|
" + keyspaces_schema\n",
|
||
|
|
" + \".ml_clustering_results\"\n",
|
||
|
|
" + '(\"run_id\",\"segment\",\"total_customers\",\"run_date\") '\n",
|
||
|
|
" + \"VALUES (?,?,?,?); \"\n",
|
||
|
|
")\n",
|
||
|
|
"\n",
|
||
|
|
"prepared = session.prepare(insert_ml)\n",
|
||
|
|
"prepared.consistency_level = ConsistencyLevel.LOCAL_QUORUM\n",
|
||
|
|
"\n",
|
||
|
|
"run_id = \"101\"\n",
|
||
|
|
"dt = datetime.now()\n",
|
||
|
|
"\n",
|
||
|
|
"for ind in dataFrame.index:\n",
|
||
|
|
" print(ind, dataFrame[\"customer_id\"][ind])\n",
|
||
|
|
" r = session.execute(\n",
|
||
|
|
" prepared,\n",
|
||
|
|
" (\n",
|
||
|
|
" run_id,\n",
|
||
|
|
" ind,\n",
|
||
|
|
" dataFrame[\"customer_id\"][ind],\n",
|
||
|
|
" dt,\n",
|
||
|
|
" ),\n",
|
||
|
|
" )"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "2bd56924",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"## Cleanup \n",
|
||
|
|
"Finally, we clean up the resources created during this tutorial to avoid incurring additional charges. So in this step we drop the Keyspaces to prevent future charges"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "code",
|
||
|
|
"execution_count": null,
|
||
|
|
"id": "3b76a063",
|
||
|
|
"metadata": {},
|
||
|
|
"outputs": [],
|
||
|
|
"source": [
|
||
|
|
"deleteKeyspace = \"DROP KEYSPACE IF EXISTS \" + keyspaces_schema\n",
|
||
|
|
"dr = session.execute(deleteKeyspace)\n",
|
||
|
|
"time.sleep(5)\n",
|
||
|
|
"print(\n",
|
||
|
|
" \"Dropping %s keyspace. It may take a few seconds to a minute to complete deletion of keyspace and table.\"\n",
|
||
|
|
" % keyspaces_schema\n",
|
||
|
|
")"
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"id": "039a2999",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"It may take a few seconds to a minute to complete the deletion of keyspace and tables. When you delete a keyspace, the keyspace and all of its tables are deleted and you stop accruing charges from them.\n",
|
||
|
|
"\n",
|
||
|
|
"## Conclusion\n",
|
||
|
|
"This notebook showed python code that helped you to ingest customer data from Amazon Keyspaces into SageMaker and train a clustering model that allowed you to segment customers. You could use this information for targeted marketing, thus greatly improving your business KPI."
|
||
|
|
]
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"attachments": {},
|
||
|
|
"cell_type": "markdown",
|
||
|
|
"metadata": {},
|
||
|
|
"source": [
|
||
|
|
"## Notebook CI Test Results\n",
|
||
|
|
"\n",
|
||
|
|
"This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n",
|
||
|
|
"\n"
|
||
|
|
]
|
||
|
|
}
|
||
|
|
],
|
||
|
|
"metadata": {
|
||
|
|
"instance_type": "ml.t3.medium",
|
||
|
|
"kernelspec": {
|
||
|
|
"display_name": "conda_python3",
|
||
|
|
"language": "python",
|
||
|
|
"name": "conda_python3"
|
||
|
|
},
|
||
|
|
"language_info": {
|
||
|
|
"codemirror_mode": {
|
||
|
|
"name": "ipython",
|
||
|
|
"version": 3
|
||
|
|
},
|
||
|
|
"file_extension": ".py",
|
||
|
|
"mimetype": "text/x-python",
|
||
|
|
"name": "python",
|
||
|
|
"nbconvert_exporter": "python",
|
||
|
|
"pygments_lexer": "ipython3",
|
||
|
|
"version": "3.8.12"
|
||
|
|
}
|
||
|
|
},
|
||
|
|
"nbformat": 4,
|
||
|
|
"nbformat_minor": 5
|
||
|
|
}
|